package com.syj.qdp.framework.redis.core.stream;

import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.TypeUtil;
import com.syj.qdp.framework.common.util.json.JsonUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;

import java.lang.reflect.Type;

/**
 * @author Lyon
 */
@Slf4j
public abstract class AbstractStreamMessageListener<T extends StreamMessage> implements StreamListener<String, ObjectRecord<String, String>> {


    private final Class<T> messageType;
    @Getter
    private final String streamKey;
    @Getter
    @Value("spring.application.name")
    private String group;
    @Setter
    private RedisTemplate redisTemplate;

    public AbstractStreamMessageListener() {
        messageType = getMessageClass();
        streamKey = ReflectUtil.newInstance(messageType).getStreamKey();
    }

    @Override
    public void onMessage(ObjectRecord<String, String> record) {
        String value = record.getValue();
        try {
            this.onMessage(JsonUtils.parseObject(value,messageType));
            // 未进行ack或ack失败时，之后可以在pending列表里重新进行消费；
            redisTemplate.opsForStream().acknowledge(group,record);
        } catch (Exception e) {
            log.info("stream 消息处理异常 gruop:[{}] id:[{}] stream:[{}] , message:[{}]",group,record.getId(),record.getStream(),record.getValue(),e);
            throw e;
        }
        // TODO 需要额外考虑以下几个点：
        // 1. 处理异常的情况
        // 2. 发送日志；以及事务的结合
        // 3. 消费日志；以及通用的幂等性
        // 4. 消费失败的重试，https://zhuanlan.zhihu.com/p/60501638
    }

    /**
     * 处理消息
     * @param message 消息
     */
    abstract void onMessage(T message);


    private Class<T> getMessageClass() {
        Type typeArgument = TypeUtil.getTypeArgument(getClass(), 0);
        if (typeArgument == null) {
            throw new IllegalStateException(String.format("类型[%s] 必须设置正确的消息类型",getClass()));
        }
        return (Class<T>) typeArgument;
    }
}
